{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "p5JTeKfCVBZf"
   },
   "source": [
    "# Overview\n",
    "\n",
    "In this tutorial, we'll use Feast to generate training data and power online model inference for a \n",
    "ride-sharing driver satisfaction prediction model. Feast solves several common issues in this flow:\n",
    "\n",
    "1. **Training-serving skew and complex data joins:** Feature values often exist across multiple tables. Joining \n",
    "   these datasets can be complicated, slow, and error-prone.\n",
    "   * Feast joins these tables with battle-tested logic that ensures _point-in-time_ correctness so future feature \n",
    "     values do not leak to models.\n",
    "2. **Online feature availability:** At inference time, models often need access to features that aren't readily \n",
    "   available and need to be precomputed from other data sources.\n",
    "   * Feast manages deployment to a variety of online stores (e.g. DynamoDB, Redis, Google Cloud Datastore) and \n",
    "     ensures necessary features are consistently _available_ and _freshly computed_ at inference time.\n",
    "3. **Feature and model versioning:** Different teams within an organization are often unable to reuse \n",
    "   features across projects, resulting in duplicate feature creation logic. Models have data dependencies that need \n",
    "   to be versioned, for example when running A/B tests on model versions.\n",
    "   * Feast enables discovery of and collaboration on previously used features and enables versioning of sets of \n",
    "     features (via _feature services_).\n",
    "   * _(Experimental)_ Feast enables light-weight feature transformations so users can re-use transformation logic \n",
    "     across online / offline use cases and across models.\n",
    "\n",
    "We will:\n",
    "1. Deploy a local feature store with a **Parquet file offline store** and **Sqlite online store**.\n",
    "2. Build a training dataset using our time series features from our **Parquet files**.\n",
    "3. Materialize feature values from the offline store into the online store.\n",
    "4. Read the latest features from the online store for inference."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "9_Y997DzvOMI"
   },
   "source": [
    "## Step 1: Install Feast\n",
    "\n",
    "Install Feast using pip:\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "rXNMAAJKQPG5"
   },
   "outputs": [],
   "source": [
    "%%sh\n",
    "pip install feast -U -q\n",
    "echo \"Please restart your runtime now (Runtime -> Restart runtime). This ensures that the correct dependencies are loaded.\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false,
    "id": "sOX_LwjaAhKz"
   },
   "source": [
    "**Reminder**: Please restart your runtime after installing Feast (Runtime -> Restart runtime). This ensures that the correct dependencies are loaded.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "OZetvs5xx4GP"
   },
   "source": [
    "## Step 2: Create a feature repository\n",
    "\n",
    "A feature repository is a directory that contains the configuration of the feature store and individual features. This configuration is written as code (Python/YAML) and it's highly recommended that teams track it centrally using git. See [Feature Repository](https://docs.feast.dev/reference/feature-repository) for a detailed explanation of feature repositories.\n",
    "\n",
    "The easiest way to create a new feature repository to use the `feast init` command. This creates a scaffolding with initial demo data.\n",
    "\n",
    "### Demo data scenario \n",
    "- We have surveyed some drivers for how satisfied they are with their experience in a ride-sharing app. \n",
    "- We want to generate predictions for driver satisfaction for the rest of the users so we can reach out to potentially dissatisfied users."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "IhirSkgUvYau",
    "outputId": "664367b9-6a2a-493d-fd78-6495fb459fa2"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "Creating a new Feast repository in \u001b[1m\u001b[32m/content/feature_repo\u001b[0m.\n",
      "\n"
     ]
    }
   ],
   "source": [
    "!feast init feature_repo"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "OdTASZPvyKCe"
   },
   "source": [
    "### Step 2a: Inspecting the feature repository\n",
    "\n",
    "Let's take a look at the demo repo itself. It breaks down into\n",
    "\n",
    "\n",
    "* `data/` contains raw demo parquet data\n",
    "* `example_repo.py` contains demo feature definitions\n",
    "* `feature_store.yaml` contains a demo setup configuring where data sources are\n",
    "* `test_workflow.py` showcases how to run all key Feast commands, including defining, retrieving, and pushing features.\n",
    "   * You can run this with `python test_workflow.py`.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "9jXuzt4ovzA3",
    "outputId": "9e326892-f0cc-4d86-d0b2-f33f822f83a9"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "/content/feature_repo\n",
      "README.md          feature_store.yaml\n",
      "__init__.py        example_repo.py    test_workflow.py\n",
      "\n",
      "./data:\n",
      "driver_stats.parquet\n"
     ]
    }
   ],
   "source": [
    "%cd feature_repo/feature_repo\n",
    "!ls -R"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "MJk_WNsbeUP6"
   },
   "source": [
    "### Step 2b: Inspecting the project configuration\n",
    "Let's inspect the setup of the project in `feature_store.yaml`. \n",
    "\n",
    "The key line defining the overall architecture of the feature store is the **provider**. \n",
    "\n",
    "The provider value sets default offline and online stores. \n",
    "* The offline store provides the compute layer to process historical data (for generating training data & feature \n",
    "  values for serving). \n",
    "* The online store is a low latency store of the latest feature values (for powering real-time inference).\n",
    "\n",
    "Valid values for `provider` in `feature_store.yaml` are:\n",
    "\n",
    "* local: use file source with SQLite/Redis\n",
    "* gcp: use BigQuery/Snowflake with Google Cloud Datastore/Redis\n",
    "* aws: use Redshift/Snowflake with DynamoDB/Redis\n",
    "\n",
    "Note that there are many other offline / online stores Feast works with, including Azure, Hive, Trino, and PostgreSQL via community plugins. See https://docs.feast.dev/roadmap for all supported connectors.\n",
    "\n",
    "A custom setup can also be made by following [Customizing Feast](https://docs.feast.dev/v/master/how-to-guides/customizing-feast)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "9_YJ--uYdtcP",
    "outputId": "af56a8da-9ca2-4dd9-f73c-a60dd3e1613a"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[94mproject\u001b[39;49;00m:\u001b[37m \u001b[39;49;00mfeature_repo\u001b[37m\u001b[39;49;00m\n",
      "\u001b[37m# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry)\u001b[39;49;00m\u001b[37m\u001b[39;49;00m\n",
      "\u001b[94mregistry\u001b[39;49;00m:\u001b[37m \u001b[39;49;00mdata/registry.db\u001b[37m\u001b[39;49;00m\n",
      "\u001b[37m# The provider primarily specifies default offline / online stores & storing the registry in a given cloud\u001b[39;49;00m\u001b[37m\u001b[39;49;00m\n",
      "\u001b[94mprovider\u001b[39;49;00m:\u001b[37m \u001b[39;49;00mlocal\u001b[37m\u001b[39;49;00m\n",
      "\u001b[94monline_store\u001b[39;49;00m:\u001b[37m\u001b[39;49;00m\n",
      "\u001b[37m    \u001b[39;49;00m\u001b[94mpath\u001b[39;49;00m:\u001b[37m \u001b[39;49;00mdata/online_store.db\u001b[37m\u001b[39;49;00m\n",
      "\u001b[94mentity_key_serialization_version\u001b[39;49;00m:\u001b[37m \u001b[39;49;00m2\u001b[37m\u001b[39;49;00m\n"
     ]
    }
   ],
   "source": [
    "!pygmentize feature_store.yaml"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "FnMlk4zshywp"
   },
   "source": [
    "### Inspecting the raw data\n",
    "\n",
    "The raw feature data we have in this demo is stored in a local parquet file. The dataset captures hourly stats of a driver in a ride-sharing app."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 423
    },
    "id": "sIF2lO59dwzi",
    "outputId": "8931930b-b32f-43e1-d45b-de230489c7b8"
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>event_timestamp</th>\n",
       "      <th>driver_id</th>\n",
       "      <th>conv_rate</th>\n",
       "      <th>acc_rate</th>\n",
       "      <th>avg_daily_trips</th>\n",
       "      <th>created</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>2022-07-24 14:00:00+00:00</td>\n",
       "      <td>1005</td>\n",
       "      <td>0.423913</td>\n",
       "      <td>0.082831</td>\n",
       "      <td>201</td>\n",
       "      <td>2022-08-08 14:14:11.200</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>2022-07-24 15:00:00+00:00</td>\n",
       "      <td>1005</td>\n",
       "      <td>0.507126</td>\n",
       "      <td>0.427470</td>\n",
       "      <td>690</td>\n",
       "      <td>2022-08-08 14:14:11.200</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>2022-07-24 16:00:00+00:00</td>\n",
       "      <td>1005</td>\n",
       "      <td>0.139810</td>\n",
       "      <td>0.129743</td>\n",
       "      <td>845</td>\n",
       "      <td>2022-08-08 14:14:11.200</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>2022-07-24 17:00:00+00:00</td>\n",
       "      <td>1005</td>\n",
       "      <td>0.383574</td>\n",
       "      <td>0.071728</td>\n",
       "      <td>839</td>\n",
       "      <td>2022-08-08 14:14:11.200</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>2022-07-24 18:00:00+00:00</td>\n",
       "      <td>1005</td>\n",
       "      <td>0.959131</td>\n",
       "      <td>0.440051</td>\n",
       "      <td>2</td>\n",
       "      <td>2022-08-08 14:14:11.200</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>...</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1802</th>\n",
       "      <td>2022-08-08 12:00:00+00:00</td>\n",
       "      <td>1001</td>\n",
       "      <td>0.994883</td>\n",
       "      <td>0.020145</td>\n",
       "      <td>650</td>\n",
       "      <td>2022-08-08 14:14:11.200</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1803</th>\n",
       "      <td>2022-08-08 13:00:00+00:00</td>\n",
       "      <td>1001</td>\n",
       "      <td>0.663844</td>\n",
       "      <td>0.864639</td>\n",
       "      <td>359</td>\n",
       "      <td>2022-08-08 14:14:11.200</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1804</th>\n",
       "      <td>2021-04-12 07:00:00+00:00</td>\n",
       "      <td>1001</td>\n",
       "      <td>0.068696</td>\n",
       "      <td>0.624977</td>\n",
       "      <td>624</td>\n",
       "      <td>2022-08-08 14:14:11.200</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1805</th>\n",
       "      <td>2022-08-01 02:00:00+00:00</td>\n",
       "      <td>1003</td>\n",
       "      <td>0.980869</td>\n",
       "      <td>0.244420</td>\n",
       "      <td>790</td>\n",
       "      <td>2022-08-08 14:14:11.200</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1806</th>\n",
       "      <td>2022-08-01 02:00:00+00:00</td>\n",
       "      <td>1003</td>\n",
       "      <td>0.980869</td>\n",
       "      <td>0.244420</td>\n",
       "      <td>790</td>\n",
       "      <td>2022-08-08 14:14:11.200</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "<p>1807 rows × 6 columns</p>\n",
       "</div>"
      ],
      "text/plain": [
       "               event_timestamp  driver_id  conv_rate  acc_rate  \\\n",
       "0    2022-07-24 14:00:00+00:00       1005   0.423913  0.082831   \n",
       "1    2022-07-24 15:00:00+00:00       1005   0.507126  0.427470   \n",
       "2    2022-07-24 16:00:00+00:00       1005   0.139810  0.129743   \n",
       "3    2022-07-24 17:00:00+00:00       1005   0.383574  0.071728   \n",
       "4    2022-07-24 18:00:00+00:00       1005   0.959131  0.440051   \n",
       "...                        ...        ...        ...       ...   \n",
       "1802 2022-08-08 12:00:00+00:00       1001   0.994883  0.020145   \n",
       "1803 2022-08-08 13:00:00+00:00       1001   0.663844  0.864639   \n",
       "1804 2021-04-12 07:00:00+00:00       1001   0.068696  0.624977   \n",
       "1805 2022-08-01 02:00:00+00:00       1003   0.980869  0.244420   \n",
       "1806 2022-08-01 02:00:00+00:00       1003   0.980869  0.244420   \n",
       "\n",
       "      avg_daily_trips                 created  \n",
       "0                 201 2022-08-08 14:14:11.200  \n",
       "1                 690 2022-08-08 14:14:11.200  \n",
       "2                 845 2022-08-08 14:14:11.200  \n",
       "3                 839 2022-08-08 14:14:11.200  \n",
       "4                   2 2022-08-08 14:14:11.200  \n",
       "...               ...                     ...  \n",
       "1802              650 2022-08-08 14:14:11.200  \n",
       "1803              359 2022-08-08 14:14:11.200  \n",
       "1804              624 2022-08-08 14:14:11.200  \n",
       "1805              790 2022-08-08 14:14:11.200  \n",
       "1806              790 2022-08-08 14:14:11.200  \n",
       "\n",
       "[1807 rows x 6 columns]"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import pandas as pd\n",
    "\n",
    "pd.read_parquet(\"data/driver_stats.parquet\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "rRL8-ubWzUFy"
   },
   "source": [
    "## Step 3: Register feature definitions and deploy your feature store\n",
    "\n",
    "`feast apply` scans python files in the current directory for feature/entity definitions and deploys infrastructure according to `feature_store.yaml`.\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "5NS4INL5n7ze"
   },
   "source": [
    "### Step 3a: Inspecting feature definitions\n",
    "Let's inspect what `example_repo.py` looks like:\n",
    "\n",
    "```python\n",
    "# This is an example feature definition file\n",
    "\n",
    "from datetime import timedelta\n",
    "\n",
    "import pandas as pd\n",
    "\n",
    "from feast import Entity, FeatureService, FeatureView, Field, FileSource, RequestSource, PushSource\n",
    "from feast.on_demand_feature_view import on_demand_feature_view\n",
    "from feast.types import Float32, Int64, Float64\n",
    "\n",
    "# Read data from parquet files. Parquet is convenient for local development mode. For\n",
    "# production, you can use your favorite DWH, such as BigQuery. See Feast documentation\n",
    "# for more info.\n",
    "driver_hourly_stats = FileSource(\n",
    "    name=\"driver_hourly_stats_source\",\n",
    "    path=\"/content/feature_repo/data/driver_stats.parquet\",\n",
    "    timestamp_field=\"event_timestamp\",\n",
    "    created_timestamp_column=\"created\",\n",
    ")\n",
    "\n",
    "# Define an entity for the driver. You can think of entity as a primary key used to\n",
    "# fetch features.\n",
    "driver = Entity(name=\"driver\", join_keys=[\"driver_id\"])\n",
    "\n",
    "# Our parquet files contain sample data that includes a driver_id column, timestamps and\n",
    "# three feature column. Here we define a Feature View that will allow us to serve this\n",
    "# data to our model online.\n",
    "driver_hourly_stats_view = FeatureView(\n",
    "    name=\"driver_hourly_stats\",\n",
    "    entities=[driver],\n",
    "    ttl=timedelta(days=1),\n",
    "    schema=[\n",
    "        Field(name=\"conv_rate\", dtype=Float32),\n",
    "        Field(name=\"acc_rate\", dtype=Float32),\n",
    "        Field(name=\"avg_daily_trips\", dtype=Int64),\n",
    "    ],\n",
    "    online=True,\n",
    "    source=driver_hourly_stats,\n",
    "    tags={},\n",
    ")\n",
    "\n",
    "# Defines a way to push data (to be available offline, online or both) into Feast.\n",
    "driver_stats_push_source = PushSource(\n",
    "    name=\"driver_stats_push_source\",\n",
    "    batch_source=driver_hourly_stats,\n",
    ")\n",
    "\n",
    "# Define a request data source which encodes features / information only\n",
    "# available at request time (e.g. part of the user initiated HTTP request)\n",
    "input_request = RequestSource(\n",
    "    name=\"vals_to_add\",\n",
    "    schema=[\n",
    "        Field(name=\"val_to_add\", dtype=Int64),\n",
    "        Field(name=\"val_to_add_2\", dtype=Int64),\n",
    "    ],\n",
    ")\n",
    "\n",
    "\n",
    "# Define an on demand feature view which can generate new features based on\n",
    "# existing feature views and RequestSource features\n",
    "@on_demand_feature_view(\n",
    "    sources=[driver_hourly_stats_view, input_request],\n",
    "    schema=[\n",
    "        Field(name=\"conv_rate_plus_val1\", dtype=Float64),\n",
    "        Field(name=\"conv_rate_plus_val2\", dtype=Float64),\n",
    "    ],\n",
    ")\n",
    "def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:\n",
    "    df = pd.DataFrame()\n",
    "    df[\"conv_rate_plus_val1\"] = inputs[\"conv_rate\"] + inputs[\"val_to_add\"]\n",
    "    df[\"conv_rate_plus_val2\"] = inputs[\"conv_rate\"] + inputs[\"val_to_add_2\"]\n",
    "    return df\n",
    "\n",
    "\n",
    "# This groups features into a model version\n",
    "driver_stats_fs = FeatureService(\n",
    "    name=\"driver_activity_v1\", features=[driver_hourly_stats_view, transformed_conv_rate]\n",
    ")\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "im_cc5HdoDno"
   },
   "source": [
    "### Step 3b: Applying feature definitions\n",
    "Now we run `feast apply` to register the feature views and entities defined in `example_repo.py`, and sets up SQLite online store tables. Note that we had previously specified SQLite as the online store in `feature_store.yaml` by specifying a `local` provider."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "RYKCKKrcxYZG",
    "outputId": "f34aa509-1dc6-4e50-e8ee-12897138f3b9"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "RuntimeWarning: On demand feature view is an experimental feature. This API is stable, but the functionality does not scale well for offline retrieval\n",
      "  warnings.warn(\n",
      "Created entity \u001b[1m\u001b[32mdriver\u001b[0m\n",
      "Created feature view \u001b[1m\u001b[32mdriver_hourly_stats\u001b[0m\n",
      "Created on demand feature view \u001b[1m\u001b[32mtransformed_conv_rate\u001b[0m\n",
      "Created feature service \u001b[1m\u001b[32mdriver_activity_v1\u001b[0m\n",
      "\n",
      "Created sqlite table \u001b[1m\u001b[32mfeature_repo_driver_hourly_stats\u001b[0m\n",
      "\n"
     ]
    }
   ],
   "source": [
    "!feast apply"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "uV7rtRQgzyf0"
   },
   "source": [
    "## Step 4: Generating training data or powering batch scoring models\n",
    "\n",
    "To train a model, we need features and labels. Often, this label data is stored separately (e.g. you have one table storing user survey results and another set of tables with feature values). Feast can help generate the features that map to these labels.\n",
    "\n",
    "Feast needs a list of **entities** (e.g. driver ids) and **timestamps**. Feast will intelligently join relevant \n",
    "tables to create the relevant feature vectors. There are two ways to generate this list:\n",
    "1. The user can query that table of labels with timestamps and pass that into Feast as an _entity dataframe_ for \n",
    "training data generation. \n",
    "2. The user can also query that table with a *SQL query* which pulls entities. See the documentation on [feature retrieval](https://docs.feast.dev/getting-started/concepts/feature-retrieval) for details    \n",
    "\n",
    "* Note that we include timestamps because we want the features for the same driver at various timestamps to be used in a model.\n",
    "\n",
    "### Step 4a: Generating training data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "C6Fzia7YwBzz",
    "outputId": "58c4c3dd-7a10-4f56-901d-1bb879ebbcb8"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "----- Feature schema -----\n",
      "\n",
      "<class 'pandas.core.frame.DataFrame'>\n",
      "RangeIndex: 3 entries, 0 to 2\n",
      "Data columns (total 10 columns):\n",
      " #   Column                              Non-Null Count  Dtype              \n",
      "---  ------                              --------------  -----              \n",
      " 0   driver_id                           3 non-null      int64              \n",
      " 1   event_timestamp                     3 non-null      datetime64[ns, UTC]\n",
      " 2   label_driver_reported_satisfaction  3 non-null      int64              \n",
      " 3   val_to_add                          3 non-null      int64              \n",
      " 4   val_to_add_2                        3 non-null      int64              \n",
      " 5   conv_rate                           3 non-null      float32            \n",
      " 6   acc_rate                            3 non-null      float32            \n",
      " 7   avg_daily_trips                     3 non-null      int32              \n",
      " 8   conv_rate_plus_val1                 3 non-null      float64            \n",
      " 9   conv_rate_plus_val2                 3 non-null      float64            \n",
      "dtypes: datetime64[ns, UTC](1), float32(2), float64(2), int32(1), int64(4)\n",
      "memory usage: 332.0 bytes\n",
      "None\n",
      "\n",
      "----- Example features -----\n",
      "\n",
      "   driver_id           event_timestamp  label_driver_reported_satisfaction  \\\n",
      "0       1001 2021-04-12 10:59:42+00:00                                   1   \n",
      "1       1002 2021-04-12 08:12:10+00:00                                   5   \n",
      "2       1003 2021-04-12 16:40:26+00:00                                   3   \n",
      "\n",
      "   val_to_add  val_to_add_2  conv_rate  acc_rate  avg_daily_trips  \\\n",
      "0           1            10   0.356766  0.051319               93   \n",
      "1           2            20   0.130452  0.359439              522   \n",
      "2           3            30   0.666570  0.343380              266   \n",
      "\n",
      "   conv_rate_plus_val1  conv_rate_plus_val2  \n",
      "0             1.356766            10.356766  \n",
      "1             2.130452            20.130452  \n",
      "2             3.666570            30.666570  \n"
     ]
    }
   ],
   "source": [
    "from datetime import datetime\n",
    "import pandas as pd\n",
    "\n",
    "from feast import FeatureStore\n",
    "\n",
    "# The entity dataframe is the dataframe we want to enrich with feature values\n",
    "# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve\n",
    "# for all entities in the offline store instead\n",
    "entity_df = pd.DataFrame.from_dict(\n",
    "    {\n",
    "        # entity's join key -> entity values\n",
    "        \"driver_id\": [1001, 1002, 1003],\n",
    "        # \"event_timestamp\" (reserved key) -> timestamps\n",
    "        \"event_timestamp\": [\n",
    "            datetime(2021, 4, 12, 10, 59, 42),\n",
    "            datetime(2021, 4, 12, 8, 12, 10),\n",
    "            datetime(2021, 4, 12, 16, 40, 26),\n",
    "        ],\n",
    "        # (optional) label name -> label values. Feast does not process these\n",
    "        \"label_driver_reported_satisfaction\": [1, 5, 3],\n",
    "        # values we're using for an on-demand transformation\n",
    "        \"val_to_add\": [1, 2, 3],\n",
    "        \"val_to_add_2\": [10, 20, 30],\n",
    "    }\n",
    ")\n",
    "\n",
    "store = FeatureStore(repo_path=\".\")\n",
    "\n",
    "training_df = store.get_historical_features(\n",
    "    entity_df=entity_df,\n",
    "    features=[\n",
    "        \"driver_hourly_stats:conv_rate\",\n",
    "        \"driver_hourly_stats:acc_rate\",\n",
    "        \"driver_hourly_stats:avg_daily_trips\",\n",
    "        \"transformed_conv_rate:conv_rate_plus_val1\",\n",
    "        \"transformed_conv_rate:conv_rate_plus_val2\",\n",
    "    ],\n",
    ").to_df()\n",
    "\n",
    "print(\"----- Feature schema -----\\n\")\n",
    "print(training_df.info())\n",
    "\n",
    "print()\n",
    "print(\"----- Example features -----\\n\")\n",
    "print(training_df.head())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "GFiXVdhz04t0"
   },
   "source": [
    "### Step 4b: Run offline inference (batch scoring)\n",
    "To power a batch model, we primarily need to generate features with the `get_historical_features` call, but using the current timestamp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "rGR_xgIs04t0",
    "outputId": "3496e5a1-79ff-4f3c-e35d-22b594992708"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "----- Example features -----\n",
      "\n",
      "   driver_id                  event_timestamp  \\\n",
      "0       1001 2022-08-08 18:22:06.555018+00:00   \n",
      "1       1002 2022-08-08 18:22:06.555018+00:00   \n",
      "2       1003 2022-08-08 18:22:06.555018+00:00   \n",
      "\n",
      "   label_driver_reported_satisfaction  val_to_add  val_to_add_2  conv_rate  \\\n",
      "0                                   1           1            10   0.663844   \n",
      "1                                   5           2            20   0.151189   \n",
      "2                                   3           3            30   0.769165   \n",
      "\n",
      "   acc_rate  avg_daily_trips  conv_rate_plus_val1  conv_rate_plus_val2  \n",
      "0  0.864639              359             1.663844            10.663844  \n",
      "1  0.695982              311             2.151189            20.151189  \n",
      "2  0.949191              789             3.769165            30.769165  \n"
     ]
    }
   ],
   "source": [
    "entity_df[\"event_timestamp\"] = pd.to_datetime(\"now\", utc=True)\n",
    "training_df = store.get_historical_features(\n",
    "    entity_df=entity_df,\n",
    "    features=[\n",
    "        \"driver_hourly_stats:conv_rate\",\n",
    "        \"driver_hourly_stats:acc_rate\",\n",
    "        \"driver_hourly_stats:avg_daily_trips\",\n",
    "        \"transformed_conv_rate:conv_rate_plus_val1\",\n",
    "        \"transformed_conv_rate:conv_rate_plus_val2\",\n",
    "    ],\n",
    ").to_df()\n",
    "\n",
    "print(\"\\n----- Example features -----\\n\")\n",
    "print(training_df.head())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "ngl7HCtmz3hG"
   },
   "source": [
    "## Step 5: Load features into your online store"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "KCXUpiQ_pmDk"
   },
   "source": [
    "### Step 5a: Using `materialize_incremental`\n",
    "\n",
    "We now serialize the latest values of features since the beginning of time to prepare for serving. Note, `materialize_incremental` serializes all new features since the last `materialize` call, or since the time provided minus the `ttl` timedelta. In this case, this will be `CURRENT_TIME - 1 day` (`ttl` was set on the `FeatureView` instances in [feature_repo/feature_repo/example_repo.py](feature_repo/feature_repo/example_repo.py)). \n",
    "\n",
    "```bash\n",
    "CURRENT_TIME=$(date -u +\"%Y-%m-%dT%H:%M:%S\")\n",
    "feast materialize-incremental $CURRENT_TIME\n",
    "```\n",
    "\n",
    "An alternative to using the CLI command is to use Python:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "7Z6QxIebAhK5",
    "outputId": "9b54777d-2dd8-4ec3-b4e7-e3275800a980"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Materializing \u001b[1m\u001b[32m1\u001b[0m feature views to \u001b[1m\u001b[32m2022-08-08 14:19:04-04:00\u001b[0m into the \u001b[1m\u001b[32msqlite\u001b[0m online store.\n",
      "\n",
      "\u001b[1m\u001b[32mdriver_hourly_stats\u001b[0m from \u001b[1m\u001b[32m2022-08-07 18:19:04-04:00\u001b[0m to \u001b[1m\u001b[32m2022-08-08 14:19:04-04:00\u001b[0m:\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|████████████████████████████████████████████████████████████████| 5/5 [00:00<00:00, 346.47it/s]\n"
     ]
    }
   ],
   "source": [
    "from datetime import datetime\n",
    "store.materialize_incremental(datetime.now())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "l7t12bhH4i9H"
   },
   "source": [
    "### Step 5b: Inspect materialized features\n",
    "\n",
    "Note that now there are `online_store.db` and `registry.db`, which store the materialized features and schema information, respectively."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "aVIgSYhI4cvR",
    "outputId": "3c60f99c-2471-4343-83ed-cc60a6a9c3b2"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "--- Data directory ---\n",
      "driver_stats.parquet online_store.db      registry.db\n",
      "\n",
      "--- Schema of online store ---\n",
      "['entity_key', 'feature_name', 'value', 'event_ts', 'created_ts']\n"
     ]
    }
   ],
   "source": [
    "print(\"--- Data directory ---\")\n",
    "!ls data\n",
    "\n",
    "import sqlite3\n",
    "import pandas as pd\n",
    "con = sqlite3.connect(\"data/online_store.db\")\n",
    "print(\"\\n--- Schema of online store ---\")\n",
    "print(\n",
    "    pd.read_sql_query(\n",
    "        \"SELECT * FROM feature_repo_driver_hourly_stats\", con).columns.tolist())\n",
    "con.close()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "AWcttaGalzAm"
   },
   "source": [
    "### Quick note on entity keys\n",
    "Note from the above command that the online store indexes by `entity_key`. \n",
    "\n",
    "[Entity keys](https://docs.feast.dev/getting-started/concepts/entity#entity-key) include a list of all entities needed (e.g. all relevant primary keys) to generate the feature vector. In this case, this is a serialized version of the `driver_id`. We use this later to fetch all features for a given driver at inference time."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "GNecKOaI0J2Z"
   },
   "source": [
    "## Step 6: Fetching real-time feature vectors for online inference"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "TBFlKRsOAhK8"
   },
   "source": [
    "At inference time, we need to quickly read the latest feature values for different drivers (which otherwise might have existed only in batch sources) from the online feature store using `get_online_features()`. These feature vectors can then be fed to the model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "a-PUsUWUxoH9",
    "outputId": "fc52dc04-db87-4f48-df36-d3941d485600"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'acc_rate': [0.86463862657547, 0.6959823369979858],\n",
      " 'avg_daily_trips': [359, 311],\n",
      " 'conv_rate_plus_val1': [1000.6638441681862, 1001.1511893719435],\n",
      " 'conv_rate_plus_val2': [2000.6638441681862, 2002.1511893719435],\n",
      " 'driver_id': [1001, 1002]}\n"
     ]
    }
   ],
   "source": [
    "from pprint import pprint\n",
    "from feast import FeatureStore\n",
    "\n",
    "store = FeatureStore(repo_path=\".\")\n",
    "\n",
    "feature_vector = store.get_online_features(\n",
    "    features=[\n",
    "        \"driver_hourly_stats:acc_rate\",\n",
    "        \"driver_hourly_stats:avg_daily_trips\",\n",
    "        \"transformed_conv_rate:conv_rate_plus_val1\",\n",
    "        \"transformed_conv_rate:conv_rate_plus_val2\",\n",
    "    ],\n",
    "    entity_rows=[\n",
    "        # {join_key: entity_value}\n",
    "        {\n",
    "            \"driver_id\": 1001,\n",
    "            \"val_to_add\": 1000,\n",
    "            \"val_to_add_2\": 2000,\n",
    "        },\n",
    "        {\n",
    "            \"driver_id\": 1002,\n",
    "            \"val_to_add\": 1001,\n",
    "            \"val_to_add_2\": 2002,\n",
    "        },\n",
    "    ],\n",
    ").to_dict()\n",
    "\n",
    "pprint(feature_vector)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "SRY87OMBoK_Z"
   },
   "source": [
    "### Fetching features using feature services\n",
    "You can also use feature services to manage multiple features, and decouple feature view definitions and the features needed by end applications. The feature store can also be used to fetch either online or historical features using the same api below. More information can be found [here](https://docs.feast.dev/getting-started/concepts/feature-retrieval).\n",
    "\n",
    " The `driver_activity_v1` feature service pulls all features from the `driver_hourly_stats` feature view:\n",
    "\n",
    "```python\n",
    "driver_stats_fs = FeatureService(\n",
    "    name=\"driver_activity_v1\", features=[driver_hourly_stats_view]\n",
    ")\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/"
    },
    "id": "BrnAEKlPn9s8",
    "outputId": "45f7f075-5243-4fa7-dbd4-63c0c22a68cd"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'acc_rate': [0.86463862657547, 0.6959823369979858],\n",
      " 'avg_daily_trips': [359, 311],\n",
      " 'conv_rate': [0.6638441681861877, 0.15118937194347382],\n",
      " 'conv_rate_plus_val1': [1000.6638441681862, 1001.1511893719435],\n",
      " 'conv_rate_plus_val2': [2000.6638441681862, 2002.1511893719435],\n",
      " 'driver_id': [1001, 1002]}\n"
     ]
    }
   ],
   "source": [
    "from feast import FeatureStore\n",
    "feature_store = FeatureStore('.')  # Initialize the feature store\n",
    "\n",
    "feature_service = feature_store.get_feature_service(\"driver_activity_v1\")\n",
    "feature_vector = feature_store.get_online_features(\n",
    "    features=feature_service,\n",
    "    entity_rows=[\n",
    "        # {join_key: entity_value}\n",
    "        {\n",
    "            \"driver_id\": 1001,\n",
    "            \"val_to_add\": 1000,\n",
    "            \"val_to_add_2\": 2000,\n",
    "        },\n",
    "        {\n",
    "            \"driver_id\": 1002,\n",
    "            \"val_to_add\": 1001,\n",
    "            \"val_to_add_2\": 2002,\n",
    "        },\n",
    "    ],\n",
    ").to_dict()\n",
    "pprint(feature_vector)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "PvPOSPV904t7"
   },
   "source": [
    "## Step 7: Making streaming features available in Feast\n",
    "Feast does not directly ingest from streaming sources. Instead, Feast relies on a push-based model to push features into Feast. You can write a streaming pipeline that generates features, which can then be pushed to the offline store, the online store, or both (depending on your needs).\n",
    "\n",
    "This relies on the `PushSource` defined above. Pushing to this source will populate all dependent feature views with the pushed feature values."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "uAg5xKDF04t7",
    "outputId": "8288b911-125f-4141-b286-f6f84bcb24ea"
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "--- Simulate a stream event ingestion of the hourly stats df ---\n",
      "   driver_id     event_timestamp             created  conv_rate  acc_rate  \\\n",
      "0       1001 2021-05-13 10:59:42 2021-05-13 10:59:42        1.0       1.0   \n",
      "\n",
      "   avg_daily_trips  \n",
      "0             1000  \n"
     ]
    }
   ],
   "source": [
    "from feast.data_source import PushMode\n",
    "\n",
    "print(\"\\n--- Simulate a stream event ingestion of the hourly stats df ---\")\n",
    "event_df = pd.DataFrame.from_dict(\n",
    "    {\n",
    "        \"driver_id\": [1001],\n",
    "        \"event_timestamp\": [\n",
    "            datetime(2021, 5, 13, 10, 59, 42),\n",
    "        ],\n",
    "        \"created\": [\n",
    "            datetime(2021, 5, 13, 10, 59, 42),\n",
    "        ],\n",
    "        \"conv_rate\": [1.0],\n",
    "        \"acc_rate\": [1.0],\n",
    "        \"avg_daily_trips\": [1000],\n",
    "    }\n",
    ")\n",
    "print(event_df)\n",
    "store.push(\"driver_stats_push_source\", event_df, to=PushMode.ONLINE_AND_OFFLINE)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "lg68gH2sy6H1"
   },
   "source": [
    "# Next steps\n",
    "\n",
    "- Read the [Concepts](https://docs.feast.dev/getting-started/concepts/) page to understand the Feast data model and architecture.\n",
    "- Check out our [Tutorials](https://docs.feast.dev/tutorials/tutorials-overview) section for more examples on how to use Feast.\n",
    "- Follow our [Running Feast with Snowflake/GCP/AWS](https://docs.feast.dev/how-to-guides/feast-snowflake-gcp-aws) guide for a more in-depth tutorial on using Feast.\n"
   ]
  }
 ],
 "metadata": {
  "colab": {
   "collapsed_sections": [],
   "name": "quickstart.ipynb",
   "provenance": []
  },
  "kernelspec": {
   "display_name": "Python 3.8.10 64-bit ('python-3.8')",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.10"
  },
  "vscode": {
   "interpreter": {
    "hash": "7d634b9af180bcb32a446a43848522733ff8f5bbf0cc46dba1a83bede04bf237"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
